home *** CD-ROM | disk | FTP | other *** search
/ Personal Computer World 2009 February / PCWFEB09.iso / Software / Resources / Chat & Communication / Digsby build 37 / digsby_setup.exe / lib / msn / P2P / P2PData.pyo (.txt) < prev    next >
Python Compiled Bytecode  |  2008-10-13  |  19KB  |  599 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyo (Python 2.5)
  3.  
  4. import logging
  5. import random
  6. import sys
  7. import struct
  8. import uuid
  9. import util
  10. from util import callsback, get, call_later
  11. from util.Events import EventMixin
  12.  
  13. try:
  14.     from CStringIO import StringIO
  15. except ImportError:
  16.     from StringIO import StringIO
  17.  
  18. log = logging.getLogger('msn.p2p.data')
  19.  
  20. flagged = lambda v, f: f & v == f
  21.  
  22. randid = lambda : random.randint(4, sys.maxint // 2 - 5)
  23.  
  24. class Flags:
  25.     names = {
  26.         0: 'none',
  27.         1: 'sync',
  28.         2: 'ack',
  29.         4: 'wait',
  30.         8: 'err',
  31.         32: 'data',
  32.         64: 'byea',
  33.         128: 'byem',
  34.         16777264: 'file',
  35.         256: 'dchs' }
  36.     NONE = 0
  37.     ONE = 1
  38.     UNKNOWN = ONE
  39.     SYNC = ONE
  40.     ACK = 2
  41.     WAIT = 4
  42.     ERROR = 8
  43.     DATA = 32
  44.     BYEACK = 64
  45.     BYEMSG = 128
  46.     FILE = 16777264
  47.     HANDSHAKE = 256
  48.     DCHS = HANDSHAKE
  49.  
  50. Header = util.new_packable(('session', 'I', 'msgid', 'I', 'offset', 'Q', 'total', 'Q', 'length', 'I', 'flags', 'I', 'msgid_ack', 'I', 'msgid_ackack', 'I', 'total_ack', 'Q'), byteorder = '<')
  51.  
  52. class P2PMessage(object):
  53.     
  54.     def __init__(self, sender, recipient, id, flags, session_id, app_id, size, content, acked_msg_id = None, prev_acked_msg_id = 0, acked_data_size = 0):
  55.         self.sender = get(sender, 'name', sender)
  56.         self.recipient = get(recipient, 'name', recipient)
  57.         self.content = content
  58.         if acked_msg_id is None:
  59.             acked_msg_id = randid()
  60.         
  61.         self.header = Header(session = session_id, msgid = id, offset = 0, total = size, length = 0, flags = flags, msgid_ack = acked_msg_id, msgid_ackack = prev_acked_msg_id, total_ack = acked_data_size)
  62.         self.app_id = self.footer = app_id
  63.         self.transferred = 0
  64.  
  65.     
  66.     def reset(self):
  67.         if self.content is not None:
  68.             
  69.             try:
  70.                 self.content.seek(0)
  71.             except ValueError:
  72.                 pass
  73.             except:
  74.                 None<EXCEPTION MATCH>ValueError
  75.             
  76.  
  77.         None<EXCEPTION MATCH>ValueError
  78.  
  79.     
  80.     def write(self, data):
  81.         self.content.write(data)
  82.         self.transferred = self.content.tell()
  83.  
  84.     
  85.     def seek(self, position):
  86.         self.content.seek(position)
  87.  
  88.     
  89.     def read(self, max_size):
  90.         if self.content is not None:
  91.             if self.content.closed:
  92.                 log.error('Read called on a message with closed content! wtf.')
  93.                 return None
  94.             
  95.             data = self.content.read(max_size)
  96.             self.transferred = self.content.tell()
  97.         else:
  98.             data = ''
  99.         return data
  100.  
  101.     
  102.     def tell(self):
  103.         return self.transferred
  104.  
  105.     
  106.     def size(self):
  107.         return self.header.total
  108.  
  109.     size = property(size)
  110.     
  111.     def complete(self):
  112.         return self.transferred == self.size
  113.  
  114.     complete = property(complete)
  115.     
  116.     def __hash__(self):
  117.         return hash((self.sender, self.recipient, self.header.pack()))
  118.  
  119.     
  120.     def __repr__(self):
  121.         content = self.content
  122.         if isinstance(content, StringIO):
  123.             content = content.getvalue()
  124.         
  125.         contentstr = ''
  126.         if content:
  127.             
  128.             try:
  129.                 contentstr = 'content=%r' % content[:30]
  130.             except Exception:
  131.                 pass
  132.             except:
  133.                 None<EXCEPTION MATCH>Exception
  134.             
  135.  
  136.         None<EXCEPTION MATCH>Exception
  137.         return '<%s session=%d msgid=%d size=%d offset=%d total=%d flags=%d type(content)=%r %s>' % (type(self).__name__, self.header.session, self.header.msgid, self.header.length, self.tell(), self.size, self.header.flags, type(content).__name__, contentstr)
  138.  
  139.  
  140.  
  141. class P2PTransport(EventMixin):
  142.     events = EventMixin.events | set(('contacts_changed', 'recv_data', 'send_data'))
  143.     
  144.     def __init__(self, client):
  145.         EventMixin.__init__(self)
  146.         client._p2p_manager._register_transport(self)
  147.         self.p2p_clients = 0
  148.  
  149.     
  150.     def p2p_peers(self):
  151.         raise NotImplementedError
  152.  
  153.     p2p_peers = property(p2p_peers)
  154.     
  155.     def p2p_rating(self):
  156.         raise NotImplementedError
  157.  
  158.     p2p_rating = property(p2p_rating)
  159.     
  160.     def p2p_max_msg_size(self):
  161.         raise NotImplementedError
  162.  
  163.     p2p_max_msg_size = property(p2p_max_msg_size)
  164.     
  165.     def p2p_send(self, data, callback = None):
  166.         raise NotImplementedError
  167.  
  168.     p2p_send = callsback(p2p_send)
  169.     
  170.     def p2p_overhead(self):
  171.         raise NotImplementedError
  172.  
  173.     p2p_overhead = property(p2p_overhead)
  174.     
  175.     def build_data(self, header, body, footer):
  176.         raise NotImplementedError
  177.  
  178.     
  179.     def __repr__(self):
  180.         return '<%s p2pclients=%r id=%d>' % (type(self).__name__, get(self, 'p2p_clients', None), id(self))
  181.  
  182.  
  183.  
  184. class P2PManager(EventMixin):
  185.     events = EventMixin.events | set(('recv_msg_start', 'recv_msg_end', 'recv_error', 'send_msg_start', 'send_msg_end', 'send_error', 'recv_data', 'send_data'))
  186.     
  187.     def __init__(self, client):
  188.         EventMixin.__init__(self)
  189.         self.client = client
  190.         self._transports = []
  191.         self._P2PManager__incoming = { }
  192.         self._P2PManager__outgoing = { }
  193.         self._P2PManager__sent = { }
  194.         self._P2PManager__last_acked = None
  195.         self.sort_transports()
  196.  
  197.     
  198.     def close_all(self):
  199.         for transport in self._transports:
  200.             
  201.             try:
  202.                 transport.Disconnect()
  203.             continue
  204.             import traceback as traceback
  205.             traceback.print_exc()
  206.             continue
  207.  
  208.         
  209.  
  210.     
  211.     def _outgoing(self):
  212.         return self._P2PManager__outgoing
  213.  
  214.     _outgoing = property(_outgoing)
  215.     
  216.     def sort_transports(self):
  217.         self._best = { }
  218.         for transport in self._transports:
  219.             rating = transport.p2p_rating
  220.             for peer in transport.p2p_peers:
  221.                 if peer in self._best:
  222.                     prev_rating = self._best[peer].p2p_rating
  223.                     if rating > prev_rating:
  224.                         self._best[peer] = transport
  225.                     
  226.                 rating > prev_rating
  227.                 self._best[peer] = transport
  228.             
  229.         
  230.  
  231.     
  232.     def _register_transport(self, transport):
  233.         
  234.         try:
  235.             v = transport._P2PManager__registered
  236.         except AttributeError:
  237.             v = False
  238.  
  239.         if v:
  240.             log.info('Transport was already registered. Returning from register.')
  241.             return None
  242.         
  243.         bind = transport.bind
  244.         bind('contacts_changed', self.transport_sorter)
  245.         bind('recv_data', self._on_recv_data)
  246.         bind('send_data', self._on_send_data)
  247.         self._transports.append(transport)
  248.         transport._P2PManager__registered = True
  249.         self.sort_transports()
  250.  
  251.     
  252.     def _unregister_transport(self, transport):
  253.         
  254.         try:
  255.             v = transport._P2PManager__registered
  256.         except AttributeError:
  257.             v = False
  258.  
  259.         if not v:
  260.             log.info('Transport was not registered. Returning from unregister.')
  261.             return None
  262.         
  263.         log.debug('P2PManager removing transport %r', transport)
  264.         unbind = transport.unbind
  265.         unbind('contacts_changed', self.transport_sorter)
  266.         unbind('recv_data', self._on_recv_data)
  267.         unbind('send_data', self._on_send_data)
  268.         transport._P2PManager__registered = False
  269.         self._transports.remove(transport)
  270.         self.sort_transports()
  271.  
  272.     
  273.     def transport_sorter(self, *a):
  274.         self.sort_transports()
  275.  
  276.     
  277.     def get_best(self, peer, callback = None):
  278.         
  279.         try:
  280.             return self._best[peer]
  281.         except KeyError:
  282.             log.info('No transport found for %s, returning default (bests: %r)', peer, self._best)
  283.             return self.client._get_default_p2p_transport(peer, callback = callback)
  284.  
  285.  
  286.     get_best = callsback(get_best)
  287.     
  288.     def _on_send_data(self):
  289.         self.event('send_data')
  290.  
  291.     
  292.     def _on_recv_data(self, transport, sender, data, has_footer = True):
  293.         
  294.         try:
  295.             header = Header.unpack(data[:48])
  296.             data = data[48:]
  297.         except Exception:
  298.             e = None
  299.             print repr(data)
  300.             raise 
  301.  
  302.         log.debug('P2PManager got message(flags=%d) from %r via %r', header.flags, sender, transport)
  303.         if has_footer:
  304.             footer = struct.unpack('>L', data[-4:])[0]
  305.             data = data[:-4]
  306.         else:
  307.             footer = 0
  308.         if footer == 0 and data == '\x00\x00\x00\x00':
  309.             footer = 1
  310.         
  311.         
  312.         try:
  313.             pass
  314.         except:
  315.             import traceback
  316.             traceback.print_exc()
  317.             raise 
  318.  
  319.         if header.total == header.offset + header.length:
  320.             log.critical('Got completed P2PMessage with flags %d (%s)', header.flags, get(Flags.names, header.flags, 'Super duper unknown flags %d' % (header.flags,)))
  321.         
  322.         if flagged(header.flags, Flags.ACK):
  323.             self._P2PManager__last_acked = header.msgid_ack
  324.             return self._process_ack(header.msgid_ack)
  325.         elif flagged(header.flags, Flags.ERROR):
  326.             log.info('Got binary transport error')
  327.             if header.msgid_ack in self._P2PManager__outgoing:
  328.                 self._P2PManager__outgoing[header.msgid_ack].on_done()
  329.             
  330.             self.event('recv_error')
  331.             return None
  332.         elif flagged(header.flags, Flags.SYNC):
  333.             
  334.             try:
  335.                 sent = self._P2PManager__outgoing[header.msgid_ack]
  336.             except Exception:
  337.                 e = None
  338.  
  339.             log.error('P2P sync received: %s', list(header))
  340.             log.error('Error acks total %d, my message says offset %d', header.total_ack, sent.msg.header.offset)
  341.             sent.msg.seek(header.total_ack - sent.msg.header.length)
  342.             return None
  343.         elif flagged(header.flags, Flags.HANDSHAKE):
  344.             log.warning('Got Direct Connect Handshake Message (DCHS) header=<%r>', list(header))
  345.             their_nonce = uuid.UUID(bytes_le = header.pack()[-16:])
  346.             msn_hash = msn_hash
  347.             import SLPCalls
  348.             None(None, log.warning, 'Their %shashed Nonce (unhashed: %s, hashed: %s)' if transport.in_key is None else '', their_nonce if not transport.in_key else their_nonce)
  349.             log.warning('My unhashed Nonce (unhashed: %s, hashed: %s)', transport.out_key, transport.out_hkey)
  350.             if transport.in_key is None:
  351.                 transport.in_key = their_nonce
  352.             elif transport.in_hkey is None:
  353.                 transport.in_hkey = their_nonce
  354.             else:
  355.                 log.info('Had all the nonces already. (How did that happen?)')
  356.             transport._send_nonce(header.msgid, header.msgid_ack)
  357.             transport.event('on_ready')
  358.             return None
  359.         elif flagged(header.flags, Flags.BYEACK):
  360.             log.info('Got ack for BYE message. Going to send waiting flags')
  361.         elif flagged(header.flags, Flags.WAIT):
  362.             log.critical("Got waiting message. Here's the header: %r", list(header))
  363.             msg = P2PMessage(None, sender, header.msgid, 6, header.session, footer, 0, None)
  364.             self.send_message(msg)
  365.             return None
  366.         
  367.         id = header.msgid
  368.         if id not in self._P2PManager__incoming:
  369.             log.info('Got new message')
  370.             msg = self._process_new(sender, header, footer)
  371.             transport.p2p_clients += 1
  372.         else:
  373.             log.debug('Continuing message with id=(%r)', id)
  374.             msg = self._P2PManager__incoming[id]
  375.         
  376.         try:
  377.             if header.offset != msg.tell():
  378.                 log.warning('Header offset does not match file offset')
  379.             
  380.             msg.seek(header.offset)
  381.             msg.write(data)
  382.         except:
  383.             self.event('recv_error')
  384.             return None
  385.  
  386.         if msg.complete:
  387.             log.info('Received message')
  388.             transport.p2p_clients -= 1
  389.             self._process_msg(header, footer, msg)
  390.         
  391.         self.event('recv_data')
  392.  
  393.     
  394.     def _process_ack(self, id):
  395.         if id in self._P2PManager__sent:
  396.             msg = self._P2PManager__sent.pop(id)
  397.             log.info('Got ack for message, NOT resetting it: %r', msg)
  398.             self.event('send_msg_end', msg)
  399.         else:
  400.             log.error('got ack for unknown message')
  401.  
  402.     
  403.     def _process_new(self, sender, header, footer):
  404.         content = self.client.slp_call_master._create_message_content(header, footer)
  405.         msg = P2PMessage(sender, None, header.msgid, header.flags, header.session, footer, header.total, content)
  406.         self._P2PManager__incoming[header.msgid] = msg
  407.         self.event('recv_msg_start', msg)
  408.         return msg
  409.  
  410.     
  411.     def _process_msg(self, header, footer, msg):
  412.         log.info('P2P message complete (%s)', msg)
  413.         self.send_ack(header, footer, msg)
  414.         msg.reset()
  415.         
  416.         try:
  417.             self.event('recv_msg_end', self, msg)
  418.         except Exception:
  419.             e = None
  420.             import traceback
  421.             traceback.print_stack()
  422.             traceback.print_exc()
  423.  
  424.         del self._P2PManager__incoming[header.msgid]
  425.  
  426.     
  427.     def send_ack(self, header, footer, msg):
  428.         
  429.         try:
  430.             flags = None if flagged(header.flags, Flags.BYEMSG) else Flags.ACK
  431.             log.info('Going to send ack with flags %s', flags)
  432.             
  433.             try:
  434.                 id = self._P2PManager__last_acked + 1
  435.             except:
  436.                 id = randid()
  437.             finally:
  438.                 self._P2PManager__last_acked = None
  439.  
  440.             sender = get(msg, 'sender')
  441.             ack_msg = P2PMessage(None, sender, id, flags, header.session, footer, header.total, None, header.msgid, header.msgid_ack, header.total)
  442.             self.send_message(ack_msg)
  443.         except Exception:
  444.             e = None
  445.             import traceback
  446.             traceback.print_exc()
  447.             raise e
  448.  
  449.  
  450.     
  451.     def send_with_producer(self, msg, callback = None):
  452.         if msg.header.msgid not in self._P2PManager__outgoing:
  453.             log.warning('Sending producer for msg with id %d', msg.header.msgid)
  454.             self.event('send_msg_start', msg)
  455.             prod = self.make_producer(msg, callback = callback)
  456.             self._P2PManager__outgoing[msg.header.msgid] = prod
  457.             self._send_producer(prod)
  458.         else:
  459.             log.warning('Got producer for %d again (???). Not sending it.', msg.header.msgid)
  460.             return None
  461.  
  462.     send_with_producer = callsback(send_with_producer)
  463.     send_message = send_with_producer
  464.     
  465.     def _send_producer(self, prod):
  466.         
  467.         try:
  468.             if not prod.transport._P2PManager__registered:
  469.                 self._register_transport(prod.transport)
  470.         except Exception:
  471.             e = None
  472.             import traceback
  473.             traceback.print_exc()
  474.             raise e
  475.  
  476.         
  477.         try:
  478.             prod.push()
  479.         except TypeError:
  480.             e = None
  481.             log.info("Can't send this producer (%r) on its transport. Need to find a new transport.", prod)
  482.             import traceback
  483.             traceback.print_exc()
  484.  
  485.  
  486.     
  487.     def make_producer(self, msg, callback = None):
  488.         log.info('P2PData making a producer for %r', msg)
  489.         prod = P2PProducer(self, msg, callback.success, callback.error)
  490.         return prod
  491.  
  492.     make_producer = callsback(make_producer)
  493.     
  494.     def _unqueue(self, msg):
  495.         del self._P2PManager__outgoing[msg.header.msgid]
  496.         if not flagged(msg.header.flags, Flags.ACK):
  497.             self._P2PManager__sent[msg.header.msgid] = msg
  498.         
  499.  
  500.  
  501.  
  502. class P2PProducer(object):
  503.     
  504.     def __init__(self, master, msg, whendone = None, on_error = None):
  505.         self.master = master
  506.         self.msg = msg
  507.         self._transport = self.master.get_best(self.msg.recipient)
  508.         self._transport.p2p_clients += 1
  509.         self._P2PProducer__finished = False
  510.         
  511.         def donothing():
  512.             pass
  513.  
  514.         self._oncomplete = self._transport if whendone is not None else donothing
  515.         self._onerror = None if on_error is not None else donothing
  516.         self.master.bind('send_msg_end', self.on_ack)
  517.  
  518.     
  519.     def transport(self):
  520.         if self._P2PProducer__finished:
  521.             raise AttributeError
  522.         
  523.         self._transport.p2p_clients -= 1
  524.         old_xport = self._transport
  525.         self._transport = self.master.get_best(self.msg.recipient, error = self._onerror)
  526.         self._transport.p2p_clients += 1
  527.         return self._transport
  528.  
  529.     transport = property(transport)
  530.     
  531.     def recipient(self):
  532.         return self.msg.recipient
  533.  
  534.     recipient = property(recipient)
  535.     
  536.     def more(self):
  537.         if self._P2PProducer__finished:
  538.             log.info('Message %r complete or cancelled, returning None', self.msg)
  539.             return None
  540.         
  541.         msg = self.msg
  542.         transport = self.transport
  543.         if hasattr(transport, '_super_secret_msgid'):
  544.             msg.header.msgid_ack = transport._super_secret_msgid
  545.         
  546.         (header, body, footer) = _next_msg(msg, transport.p2p_max_msg_size - transport.p2p_overhead)
  547.         if body == body and footer == footer:
  548.             pass
  549.         elif footer == None:
  550.             if not msg.complete:
  551.                 log.error('Message %r is not complete but had no more data', msg)
  552.             else:
  553.                 log.warning('Message %r complete. Returning None')
  554.             self.on_done()
  555.             return None
  556.         
  557.         data = transport.build_data(header, body, footer)
  558.         transport.event('send_data')
  559.         return data
  560.  
  561.     
  562.     def on_ack(self, msg):
  563.         if msg.header.msgid == self.msg.header.msgid:
  564.             log.info('Finished producer calling %r', self._oncomplete)
  565.             self._oncomplete()
  566.             self.master.unbind('send_msg_end', self.on_ack)
  567.         
  568.  
  569.     
  570.     def on_done(self):
  571.         log.info('Producer finished')
  572.         self._P2PProducer__finished = True
  573.         self.master._unqueue(self.msg)
  574.         self._transport.p2p_clients -= 1
  575.         del self._transport
  576.  
  577.     
  578.     def push(self):
  579.         transport = self.transport
  580.         transport.push_with_producer(self, error = self._onerror)
  581.  
  582.     
  583.     def __repr__(self):
  584.         return None % ('<%s message=%r, %s>', type(self).__name__, self.msg if self._P2PProducer__finished else 'transport=%r' % self._transport)
  585.  
  586.  
  587.  
  588. def _next_msg(msg, size):
  589.     header = msg.header
  590.     header.offset = msg.transferred
  591.     body = msg.read(size)
  592.     if body is None:
  593.         return (None, None, None)
  594.     
  595.     blen = len(body)
  596.     header.length = blen
  597.     return (header.pack(), body, struct.pack('>L', msg.footer))
  598.  
  599.